/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.gcp.util;



import com.google.api.client.googleapis.batch.BatchRequest;

import com.google.api.client.googleapis.batch.json.JsonBatchCallback;

import com.google.api.client.googleapis.json.GoogleJsonError;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;

import com.google.api.client.http.HttpHeaders;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.util.BackOff;

import com.google.api.client.util.Sleeper;

import com.google.api.services.storage.Storage;

import com.google.api.services.storage.model.Bucket;

import com.google.api.services.storage.model.Objects;

import com.google.api.services.storage.model.RewriteResponse;

import com.google.api.services.storage.model.StorageObject;

import com.google.auto.value.AutoValue;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;

import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;

import com.google.cloud.hadoop.util.*;

import com.bff.gaia.unified.sdk.extensions.gcp.options.GcsOptions;

import com.bff.gaia.unified.sdk.extensions.gcp.util.gcsfs.GcsPath;

import com.bff.gaia.unified.sdk.options.DefaultValueFactory;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.util.FluentBackoff;

import com.bff.gaia.unified.sdk.util.MoreFutures;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import com.bff.gaia.unified.vendor.guava.com.google.common.util.concurrent.MoreExecutors;

import org.joda.time.Duration;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.FileNotFoundException;

import java.io.IOException;

import java.nio.channels.SeekableByteChannel;

import java.nio.channels.WritableByteChannel;

import java.nio.file.AccessDeniedException;

import java.nio.file.FileAlreadyExistsException;

import java.util.*;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.regex.Matcher;

import java.util.regex.Pattern;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/** Provides operations on GCS. */

public class GcsUtil {

  /**

   * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport

   * flags specified on the {@link PipelineOptions}.

   */

  public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {

    /**

     * Returns an instance of {@link GcsUtil} based on the {@link PipelineOptions}.

     *

     * <p>If no instance has previously been created, one is created and the value stored in {@code

     * options}.

     */

    @Override

    public GcsUtil create(PipelineOptions options) {

      LOG.debug("Creating new GcsUtil");

      GcsOptions gcsOptions = options.as(GcsOptions.class);

      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);

      return new GcsUtil(

          storageBuilder.build(),

          storageBuilder.getHttpRequestInitializer(),

          gcsOptions.getExecutorService(),

          gcsOptions.getGcsUploadBufferSizeBytes());

    }



    /** Returns an instance of {@link GcsUtil} based on the given parameters. */

    public static GcsUtil create(

        Storage storageClient,

        HttpRequestInitializer httpRequestInitializer,

        ExecutorService executorService,

        @Nullable Integer uploadBufferSizeBytes) {

      return new GcsUtil(

          storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);

    }

  }



  private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);



  /** Maximum number of items to retrieve per Objects.List request. */

  private static final long MAX_LIST_ITEMS_PER_CALL = 1024;



  /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */

  private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");



  /** Maximum number of requests permitted in a GCS batch request. */

  private static final int MAX_REQUESTS_PER_BATCH = 100;

  /** Maximum number of concurrent batches of requests executing on GCS. */

  private static final int MAX_CONCURRENT_BATCHES = 256;



  private static final FluentBackoff BACKOFF_FACTORY =

      FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1));



  /////////////////////////////////////////////////////////////////////////////



  /** Client for the GCS API. */

  private Storage storageClient;



  private final HttpRequestInitializer httpRequestInitializer;

  /** Buffer size for GCS uploads (in bytes). */

  @Nullable private final Integer uploadBufferSizeBytes;



  // Helper delegate for turning IOExceptions from API calls into higher-level semantics.

  private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();



  // Unbounded thread pool for codependent pipeline operations that will deadlock the pipeline if

  // starved for threads.

  // Exposed for testing.

  final ExecutorService executorService;



  /** Rewrite operation setting. For testing purposes only. */

  @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall;



  @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed;



  /** Returns the prefix portion of the glob that doesn't contain wildcards. */

  public static String getNonWildcardPrefix(String globExp) {

    Matcher m = GLOB_PREFIX.matcher(globExp);

    checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp));

    return m.group("PREFIX");

  }



  /**

   * Expands glob expressions to regular expressions.

   *

   * @param globExp the glob expression to expand

   * @return a string with the regular expression this glob expands to

   */

  public static String wildcardToRegexp(String globExp) {

    StringBuilder dst = new StringBuilder();

    char[] src = globExp.replace("**/*", "**").toCharArray();

    int i = 0;

    while (i < src.length) {

      char c = src[i++];

      switch (c) {

        case '*':

          // One char lookahead for **

          if (i < src.length && src[i] == '*') {

            dst.append(".*");

            ++i;

          } else {

            dst.append("[^/]*");

          }

          break;

        case '?':

          dst.append("[^/]");

          break;

        case '.':

        case '+':

        case '{':

        case '}':

        case '(':

        case ')':

        case '|':

        case '^':

        case '$':

          // These need to be escaped in regular expressions

          dst.append('\\').append(c);

          break;

        case '\\':

          i = doubleSlashes(dst, src, i);

          break;

        default:

          dst.append(c);

          break;

      }

    }

    return dst.toString();

  }



  /** Returns true if the given {@code spec} contains wildcard. */

  public static boolean isWildcard(GcsPath spec) {

    return GLOB_PREFIX.matcher(spec.getObject()).matches();

  }



  private GcsUtil(

      Storage storageClient,

      HttpRequestInitializer httpRequestInitializer,

      ExecutorService executorService,

      @Nullable Integer uploadBufferSizeBytes) {

    this.storageClient = storageClient;

    this.httpRequestInitializer = httpRequestInitializer;

    this.uploadBufferSizeBytes = uploadBufferSizeBytes;

    this.executorService = executorService;

    this.maxBytesRewrittenPerCall = null;

    this.numRewriteTokensUsed = null;

  }



  // Use this only for testing purposes.

  protected void setStorageClient(Storage storageClient) {

    this.storageClient = storageClient;

  }



  /**

   * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in

   * the result. For patterns that only match a single object, we ensure that the object exists.

   */

  public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {

    Pattern p = null;

    String prefix = null;

    if (isWildcard(gcsPattern)) {

      // Part before the first wildcard character.

      prefix = getNonWildcardPrefix(gcsPattern.getObject());

      p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));

    } else {

      // Not a wildcard.

      try {

        // Use a get request to fetch the metadata of the object, and ignore the return value.

        // The request has strong global consistency.

        getObject(gcsPattern);

        return ImmutableList.of(gcsPattern);

      } catch (FileNotFoundException e) {

        // If the path was not found, return an empty list.

        return ImmutableList.of();

      }

    }



    LOG.debug(

        "matching files in bucket {}, prefix {} against pattern {}",

        gcsPattern.getBucket(),

        prefix,

        p.toString());



    String pageToken = null;

    List<GcsPath> results = new ArrayList<>();

    do {

      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);

      if (objects.getItems() == null) {

        break;

      }



      // Filter objects based on the regex.

      for (StorageObject o : objects.getItems()) {

        String name = o.getName();

        // Skip directories, which end with a slash.

        if (p.matcher(name).matches() && !name.endsWith("/")) {

          LOG.debug("Matched object: {}", name);

          results.add(GcsPath.fromObject(o));

        }

      }

      pageToken = objects.getNextPageToken();

    } while (pageToken != null);



    return results;

  }



  @VisibleForTesting

  @Nullable

  Integer getUploadBufferSizeBytes() {

    return uploadBufferSizeBytes;

  }



  private static BackOff createBackOff() {

    return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());

  }



  /**

   * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not

   * exist.

   */

  public long fileSize(GcsPath path) throws IOException {

    return getObject(path).getSize().longValue();

  }



  /** Returns the {@link StorageObject} for the given {@link GcsPath}. */

  public StorageObject getObject(GcsPath gcsPath) throws IOException {

    return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);

  }



  @VisibleForTesting

  StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {

    Storage.Objects.Get getObject =

        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());

    try {

      return ResilientOperation.retry(

          ResilientOperation.getGoogleRequestCallable(getObject),

          backoff,

          RetryDeterminer.SOCKET_ERRORS,

          IOException.class,

          sleeper);

    } catch (IOException | InterruptedException e) {

      if (e instanceof InterruptedException) {

        Thread.currentThread().interrupt();

      }

      if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {

        throw new FileNotFoundException(gcsPath.toString());

      }

      throw new IOException(

          String.format("Unable to get the file object for path %s.", gcsPath), e);

    }

  }



  /**

   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given {@link

   * GcsPath GcsPaths}.

   */

  public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException {

    List<StorageObjectOrIOException[]> results = new ArrayList<>();

    executeBatches(makeGetBatches(gcsPaths, results));

    ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder();

    for (StorageObjectOrIOException[] result : results) {

      ret.add(result[0]);

    }

    return ret.build();

  }



  /** Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. */

  public Objects listObjects(String bucket, String prefix, @Nullable String pageToken)

      throws IOException {

    // List all objects that start with the prefix (including objects in sub-directories).

    Storage.Objects.List listObject = storageClient.objects().list(bucket);

    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);

    listObject.setPrefix(prefix);



    if (pageToken != null) {

      listObject.setPageToken(pageToken);

    }



    try {

      return ResilientOperation.retry(

          ResilientOperation.getGoogleRequestCallable(listObject),

          createBackOff(),

          RetryDeterminer.SOCKET_ERRORS,

          IOException.class);

    } catch (Exception e) {

      throw new IOException(

          String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), e);

    }

  }



  /**

   * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not

   * exist.

   */

  @VisibleForTesting

  List<Long> fileSizes(List<GcsPath> paths) throws IOException {

    List<StorageObjectOrIOException> results = getObjects(paths);



    ImmutableList.Builder<Long> ret = ImmutableList.builder();

    for (StorageObjectOrIOException result : results) {

      ret.add(toFileSize(result));

    }

    return ret.build();

  }



  private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException)

      throws IOException {

    if (storageObjectOrIOException.ioException() != null) {

      throw storageObjectOrIOException.ioException();

    } else {

      return storageObjectOrIOException.storageObject().getSize().longValue();

    }

  }



  /**

   * Opens an object in GCS.

   *

   * <p>Returns a SeekableByteChannel that provides access to data in the bucket.

   *

   * @param path the GCS filename to read from

   * @return a SeekableByteChannel that can read the object data

   */

  public SeekableByteChannel open(GcsPath path) throws IOException {

    return new GoogleCloudStorageReadChannel(

        storageClient,

        path.getBucket(),

        path.getObject(),

        errorExtractor,

        new ClientRequestHelper<>());

  }



  /**

   * Creates an object in GCS.

   *

   * <p>Returns a WritableByteChannel that can be used to write data to the object.

   *

   * @param path the GCS file to write to

   * @param type the type of object, eg "text/plain".

   * @return a Callable object that encloses the operation.

   */

  public WritableByteChannel create(GcsPath path, String type) throws IOException {

    return create(path, type, uploadBufferSizeBytes);

  }



  /**

   * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding {code

   * uploadBufferSizeBytes}.

   */

  public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)

      throws IOException {

    GoogleCloudStorageWriteChannel channel =

        new GoogleCloudStorageWriteChannel(

            executorService,

            storageClient,

            new ClientRequestHelper<>(),

            path.getBucket(),

            path.getObject(),

            type,

            /* kmsKeyName= */ null,

            AsyncWriteChannelOptions.newBuilder().build(),

            new ObjectWriteConditions(),

            Collections.emptyMap());

    if (uploadBufferSizeBytes != null) {

      channel.setUploadBufferSize(uploadBufferSizeBytes);

    }

    channel.initialize();

    return channel;

  }



  /** Returns whether the GCS bucket exists and is accessible. */

  public boolean bucketAccessible(GcsPath path) throws IOException {

    return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT);

  }



  /**

   * Returns the project number of the project which owns this bucket. If the bucket exists, it must

   * be accessible otherwise the permissions exception will be propagated. If the bucket does not

   * exist, an exception will be thrown.

   */

  public long bucketOwner(GcsPath path) throws IOException {

    return getBucket(path, createBackOff(), Sleeper.DEFAULT).getProjectNumber().longValue();

  }



  /**

   * Creates a {@link Bucket} under the specified project in Cloud Storage or propagates an

   * exception.

   */

  public void createBucket(String projectId, Bucket bucket) throws IOException {

    createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT);

  }



  /**

   * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due

   * to permissions.

   */

  @VisibleForTesting

  boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {

    try {

      return getBucket(path, backoff, sleeper) != null;

    } catch (AccessDeniedException | FileNotFoundException e) {

      return false;

    }

  }



  @VisibleForTesting

  @Nullable

  Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {

    Storage.Buckets.Get getBucket = storageClient.buckets().get(path.getBucket());



    try {

      return ResilientOperation.retry(

          ResilientOperation.getGoogleRequestCallable(getBucket),

          backoff,

          new RetryDeterminer<IOException>() {

            @Override

            public boolean shouldRetry(IOException e) {

              if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {

                return false;

              }

              return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);

            }

          },

          IOException.class,

          sleeper);

    } catch (GoogleJsonResponseException e) {

      if (errorExtractor.accessDenied(e)) {

        throw new AccessDeniedException(path.toString(), null, e.getMessage());

      }

      if (errorExtractor.itemNotFound(e)) {

        throw new FileNotFoundException(e.getMessage());

      }

      throw e;

    } catch (InterruptedException e) {

      Thread.currentThread().interrupt();

      throw new IOException(

          String.format(

              "Error while attempting to verify existence of bucket gs://%s", path.getBucket()),

          e);

    }

  }



  @VisibleForTesting

  void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)

      throws IOException {

    Storage.Buckets.Insert insertBucket = storageClient.buckets().insert(projectId, bucket);

    insertBucket.setPredefinedAcl("projectPrivate");

    insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");



    try {

      ResilientOperation.retry(

          ResilientOperation.getGoogleRequestCallable(insertBucket),

          backoff,

          new RetryDeterminer<IOException>() {

            @Override

            public boolean shouldRetry(IOException e) {

              if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) {

                return false;

              }

              return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);

            }

          },

          IOException.class,

          sleeper);

      return;

    } catch (GoogleJsonResponseException e) {

      if (errorExtractor.accessDenied(e)) {

        throw new AccessDeniedException(bucket.getName(), null, e.getMessage());

      }

      if (errorExtractor.itemAlreadyExists(e)) {

        throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage());

      }

      throw e;

    } catch (InterruptedException e) {

      Thread.currentThread().interrupt();

      throw new IOException(

          String.format(

              "Error while attempting to create bucket gs://%s for rproject %s",

              bucket.getName(), projectId),

          e);

    }

  }



  private static void executeBatches(List<BatchRequest> batches) throws IOException {

    ExecutorService executor =

        MoreExecutors.listeningDecorator(

            new ThreadPoolExecutor(

                MAX_CONCURRENT_BATCHES,

                MAX_CONCURRENT_BATCHES,

                0L,

                TimeUnit.MILLISECONDS,

                new LinkedBlockingQueue<>()));



    List<CompletionStage<Void>> futures = new ArrayList<>();

    for (final BatchRequest batch : batches) {

      futures.add(MoreFutures.runAsync(() -> batch.execute(), executor));

    }



    try {

      MoreFutures.get(MoreFutures.allAsList(futures));

    } catch (InterruptedException e) {

      Thread.currentThread().interrupt();

      throw new IOException("Interrupted while executing batch GCS request", e);

    } catch (ExecutionException e) {

      if (e.getCause() instanceof FileNotFoundException) {

        throw (FileNotFoundException) e.getCause();

      }

      throw new IOException("Error executing batch GCS request", e);

    } finally {

      executor.shutdown();

    }

  }



  /**

   * Makes get {@link BatchRequest BatchRequests}.

   *

   * @param paths {@link GcsPath GcsPaths}.

   * @param results mutable {@link List} for return values.

   * @return {@link BatchRequest BatchRequests} to execute.

   * @throws IOException

   */

  @VisibleForTesting

  List<BatchRequest> makeGetBatches(

	  Collection<GcsPath> paths, List<StorageObjectOrIOException[]> results) throws IOException {

    List<BatchRequest> batches = new ArrayList<>();

    for (List<GcsPath> filesToGet :

        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {

      BatchRequest batch = createBatchRequest();

      for (GcsPath path : filesToGet) {

        results.add(enqueueGetFileSize(path, batch));

      }

      batches.add(batch);

    }

    return batches;

  }



  /**

   * Wrapper for RewriteRequest that supports multiple calls.

   *

   * <p>Usage: create, enqueue(), and execute batch. Then, check getReadyToEnqueue() if another

   * round of enqueue() and execute is required. Repeat until getReadyToEnqueue() returns false.

   */

  class RewriteOp extends JsonBatchCallback<RewriteResponse> {

    private GcsPath from;

    private GcsPath to;

    private boolean readyToEnqueue;

    @VisibleForTesting Storage.Objects.Rewrite rewriteRequest;



    public boolean getReadyToEnqueue() {

      return readyToEnqueue;

    }



    public void enqueue(BatchRequest batch) throws IOException {

      if (!readyToEnqueue) {

        throw new IOException(

            String.format(

                "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",

                from, to, readyToEnqueue));

      }

      rewriteRequest.queue(batch, this);

      readyToEnqueue = false;

    }



    public RewriteOp(GcsPath from, GcsPath to) throws IOException {

      this.from = from;

      this.to = to;

      rewriteRequest =

          storageClient

              .objects()

              .rewrite(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null);

      if (maxBytesRewrittenPerCall != null) {

        rewriteRequest.setMaxBytesRewrittenPerCall(maxBytesRewrittenPerCall);

      }

      readyToEnqueue = true;

    }



    @Override

    public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeaders)

        throws IOException {

      if (rewriteResponse.getDone()) {

        LOG.debug("Rewrite done: {} to {}", from, to);

        readyToEnqueue = false;

      } else {

        LOG.debug(

            "Rewrite progress: {} of {} bytes, {} to {}",

            rewriteResponse.getTotalBytesRewritten(),

            rewriteResponse.getObjectSize(),

            from,

            to);

        rewriteRequest.setRewriteToken(rewriteResponse.getRewriteToken());

        readyToEnqueue = true;

        if (numRewriteTokensUsed != null) {

          numRewriteTokensUsed.incrementAndGet();

        }

      }

    }



    @Override

    public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {

      readyToEnqueue = false;

      throw new IOException(String.format("Error trying to rewrite %s to %s: %s", from, to, e));

    }

  }



  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)

      throws IOException {

    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, destFilenames);

    while (rewrites.size() > 0) {

      executeBatches(makeCopyBatches(rewrites));

    }

  }



  LinkedList<RewriteOp> makeRewriteOps(

      Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException {

    List<String> srcList = Lists.newArrayList(srcFilenames);

    List<String> destList = Lists.newArrayList(destFilenames);

    checkArgument(

        srcList.size() == destList.size(),

        "Number of source files %s must equal number of destination files %s",

        srcList.size(),

        destList.size());

    LinkedList<RewriteOp> rewrites = Lists.newLinkedList();

    for (int i = 0; i < srcList.size(); i++) {

      final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));

      final GcsPath destPath = GcsPath.fromUri(destList.get(i));

      rewrites.addLast(new RewriteOp(sourcePath, destPath));

    }

    return rewrites;

  }



  List<BatchRequest> makeCopyBatches(LinkedList<RewriteOp> rewrites) throws IOException {

    List<BatchRequest> batches = new ArrayList<>();

    BatchRequest batch = createBatchRequest();

    Iterator<RewriteOp> it = rewrites.iterator();

    while (it.hasNext()) {

      RewriteOp rewrite = it.next();

      if (!rewrite.getReadyToEnqueue()) {

        it.remove();

        continue;

      }

      rewrite.enqueue(batch);



      if (batch.size() >= MAX_REQUESTS_PER_BATCH) {

        batches.add(batch);

        batch = createBatchRequest();

      }

    }

    if (batch.size() > 0) {

      batches.add(batch);

    }

    return batches;

  }



  List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException {

    List<BatchRequest> batches = new ArrayList<>();

    for (List<String> filesToDelete :

        Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {

      BatchRequest batch = createBatchRequest();

      for (String file : filesToDelete) {

        enqueueDelete(GcsPath.fromUri(file), batch);

      }

      batches.add(batch);

    }

    return batches;

  }



  public void remove(Collection<String> filenames) throws IOException {

    executeBatches(makeRemoveBatches(filenames));

  }



  private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch)

      throws IOException {

    final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1];



    Storage.Objects.Get getRequest =

        storageClient.objects().get(path.getBucket(), path.getObject());

    getRequest.queue(

        batch,

        new JsonBatchCallback<StorageObject>() {

          @Override

          public void onSuccess(StorageObject response, HttpHeaders httpHeaders)

              throws IOException {

            ret[0] = StorageObjectOrIOException.create(response);

          }



          @Override

          public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException {

            IOException ioException;

            if (errorExtractor.itemNotFound(e)) {

              ioException = new FileNotFoundException(path.toString());

            } else {

              ioException = new IOException(String.format("Error trying to get %s: %s", path, e));

            }

            ret[0] = StorageObjectOrIOException.create(ioException);

          }

        });

    return ret;

  }



  /** A class that holds either a {@link StorageObject} or an {@link IOException}. */

  @AutoValue

  public abstract static class StorageObjectOrIOException {



    /** Returns the {@link StorageObject}. */

    @Nullable

    public abstract StorageObject storageObject();



    /** Returns the {@link IOException}. */

    @Nullable

    public abstract IOException ioException();



    @VisibleForTesting

    public static StorageObjectOrIOException create(StorageObject storageObject) {

      return new AutoValue_GcsUtil_StorageObjectOrIOException(

          checkNotNull(storageObject, "storageObject"), null /* ioException */);

    }



    @VisibleForTesting

    public static StorageObjectOrIOException create(IOException ioException) {

      return new AutoValue_GcsUtil_StorageObjectOrIOException(

          null /* storageObject */, checkNotNull(ioException, "ioException"));

    }

  }



  private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException {

    Storage.Objects.Delete deleteRequest =

        storageClient.objects().delete(file.getBucket(), file.getObject());

    deleteRequest.queue(

        batch,

        new JsonBatchCallback<Void>() {

          @Override

          public void onSuccess(Void obj, HttpHeaders responseHeaders) {

            LOG.debug("Successfully deleted {}", file);

          }



          @Override

          public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {

            if (e.getCode() == 404) {

              LOG.info(

                  "Ignoring failed deletion of file {} which already does not exist: {}", file, e);

            } else {

              throw new IOException(String.format("Error trying to delete %s: %s", file, e));

            }

          }

        });

  }



  private BatchRequest createBatchRequest() {

    return storageClient.batch(httpRequestInitializer);

  }



  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {

    // Emit the next character without special interpretation

    dst.append('\\');

    if ((i - 1) != src.length) {

      dst.append(src[i]);

      i++;

    } else {

      // A backslash at the very end is treated like an escaped backslash

      dst.append('\\');

    }

    return i;

  }

}